Talend×LambdaでサーバーレスのETL処理をする
はじめに
自分が所属しているデータインテグレーション部では、TalendというETLツールをよく使っています。
Talendの使い方としては、S3からデータを取得してデータ加工処理を行い、再度S3へアップロードしてRedshiftにデータをロードするということをよく行っています。Talendで作成したジョブはEC2上に配置して定期的に実行し、S3にデータがあれば処理をするということをしております。
Talendのジョブを実行する場合、jarファイルにして出力しそれを実行します。なのでTalendで作成したjarファイルをLambdaから呼び出せればジョブを実行できるなと考え、今回やってみました。
開発環境
Talendでジョブを作成する
Talendでジョブを作成します。今回はS3にファイルがアップロードされたのをトリガーにS3からCSVファイルを取得して、データ変換処理を行いRedshiftに登録するまでをやっていたいと思います。
今回Talendで使用するコンポーネントは以下の5つになります。
ジョブ全体としてはこのようになります。
簡単にTalendのジョブの説明をしていきます。
ジョブを実行すると上記の上から順番(同列にあるものは左から右に)に処理がされていきます。
①tS3GetでS3からCSVファイルを取得します。設定する項目としてはアクセスキー、シークレットキー、Region、バケット、Key、出力先のファイルパスになります。Lambdaは一時作業ディレクトリとして /tmp フォルダが誰でも読み書きできる領域となっているためS3からダウンロードしたファイルはそこに保存するようにしています。
②tFileInputDelimitedでS3からダウンロードしたファイルを読み込みます。ファイル名/ストリームに読み込むファイルを指定します。フィールド区切り記号ですが今回はカンマ区切りのデータにしているのでカンマを指定します。またヘッダ行があるため読み込みをスキップするのにヘッダーに1を指定して1行スキップしています。
③読み込んだCSVデータをtMapで変換します。ここでは簡単に読み込んだデータの最後に作成日という形でnew Date()を使って日付を追加しています。
④tFileOutputDelimitedで変換したデータをファイル出力します。tS3Getと同様に/tmp フォルダに出力するようにしています。
⑤tS3PutでS3にファイルをアップロードします。設定項目としては①のtS3Getと同じになります。
⑥tRedshiftBulkExecでS3にアップロードしたファイルをRedshiftにロードします。ここではロード対象のS3の設定、登録するRedshiftを指定します。
S3から取得するcsvファイルはこんな感じです。
id,name 1,hoge 2,fuga 3,piyo
ジョブを出力する
作成したTalendのジョブを出力します。作成したジョブを選択して右クリックから「ジョブをエクスポート」を選択し、出力します。出力したZIPファイルを解凍するとジョブ名と同じフォルダが作成されています。そのフォルダ内にあるjarファイルが今回作成したコンポーネントをビルドしたものになります。またlibフォルダにはそのビルドしたjarに必要なライブラリが格納されています。今回作成したジョブは「etl_sample」というジョブ名なのでetl_sample_0.1.jarが作成されています。
EclipseでLambdaプロジェクトを作成する
EclipseにAWS の開発用ツールをインストールします。メニュー->ヘルプ->新規ソフトウェアのインストールから作業対象に「http://aws.amazon.com/eclipse」を入力し、AWS Development ToolsからLambda Pluginをインストールします。
クレデンシャル情報はメニューのウィンドウ->設定->AWSツールキットから設定して下さい。
次に新規プロジェクトからAWS Lambda Java Projectを選択し、プロジェクト名、パッケージ名をを入力し「入力タイプ」が「S3 Event」になっていることを確認して完了ボタンをクリックします。
プロジェクトが作成できたので、先ほど出力したTalendのjarを追加します。プロジェクト直下にlibsフォルダを作成し、以下のjarファイルを配置しクラスパスを通します。
次にTalendで作成したetl_sample_0_1.jarを呼び出す実装をします。まずetl_sample_0_1.jarの中身を見てみるとmainメソッドは以下のようになっています。
public static void main(String[] args) { final etl_sample etl_sampleClass = new etl_sample(); int exitCode = etl_sampleClass.runJobInTOS(args); System.exit(exitCode); }
見て分かるとおりmainメソッドはとてもシンプルです。LambdaプロジェクトのhandleRequestメソッドからも同じように呼び出しをします。
@Override public Object handleRequest(S3Event input, Context context) { context.getLogger().log("Start ETL"); final etl_sample etl_sampleClass = new etl_sample(); String[] args = {}; int exitCode = etl_sampleClass.runJobInTOS(args); context.getLogger().log("exitCode = " + exitCode); context.getLogger().log("End ETL"); return null; }
Lambdaに登録する
実装が終わったのでLambdaに登録します。プロジェクトを選択して右クリックから「Amazon Webサービス->Upload Function to AWS Lambda」を選択します。Regionを選択して、「Create a new Lambda Function」を選択し、適当なfunction名を入力します。「次へ」ボタンをクリックし、IAM Role、S3のバケットを選択して「完了」ボタンをクリックします。これでLambdaへの登録が完了しました。最後にManagement ConsoleからTriggerの設定をします。TriggerはETL処理対象のファイルがS3にアップロードされたらLambdaが動作するようにしています。
Redshiftにテーブルを作成する
CSVファイルをロードするテーブルを作成します。
create table sample1 ( id integer not null , name character varying(50) not null , created_at timestamp not null ) ;
実行してみる
では動かしてみます。S3のバケットにsample.csvファイルをアップロードします。Lambdaが実行され、S3バケットの同階層にTalendで変換されたファイルがアップロードされます。Lambdaのログを確認してみます。ログが出力され動作したことが分かります。
次にファイルをアップロードしたS3のフォルダを見てみます。変換処理されたファイルが存在していることが分かります。
最後にRedshiftのテーブルの中身を見てみます。CSVファイルのデータが登録されていることも確認できました。
まとめ
いかがだったでしょうか。無事LambdaからTalendで作成したジョブが呼び出せました。Lambdaでの実行なので制約はありますがTalendを使ってサーバーレスでETL処理ができるのはいいなと感じました。今回は以上です。